如何在Springboot中使用Redis5的Stream

您所在的位置:网站首页 stringredistemplate api 如何在Springboot中使用Redis5的Stream

如何在Springboot中使用Redis5的Stream

2024-01-28 09:09| 来源: 网络整理| 查看: 265

如何在Springboot中使用Redis5的Stream 关于Stream redis实现mq的方案以及stream的应用 技术交流 背景 使用redis做mq已经不是什么新的技术方案了,各路技术大牛各显神通,奈何实现却良莠不齐,这也导致redis的作者看不下去这种乱象,因此基于redis的代码框架写了一个分布式的mq服务:disque。 岂料disque生不逢时,大家似乎对使用它的热情并没有redis那么强烈,这个项目因此也逐渐处于不维护的状态,虽然redis的作者信誓旦旦说将来要把disque作为redis的一个modu…

一句话概括:Redis5的新数据类型,功能就是MQ。可以生产消息,消费消息。支持群组消费,以及消息确认。

在理解了Stream后,就可以继续往下看

SpringBoot整合

只需要整合进Redis就行。

POM.xml

springboot2默认使用lettuce作为客户端

org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2 配置 spring: redis: database: 0 host: 192.168.1.103 port: 6379 password: "123456" timeout: 2000 lettuce: pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0 消息 和 消息ID的对象

我觉得要先说一下,这两个对象。因为以下的内容,都需要跟这两个对象打交道

消息对象的创建

使用 StreamRecords 的静态方法来创建消息实例。 一个stream消息有两个内容。可以理解为:一个是key,一个是value。 key和value都可以使用自定义的对象,字节,字符串来定义

ByteRecord rawBytes(Map raw) ByteBufferRecord rawBuffer(Map raw) StringRecord string(Map raw) MapRecord mapBacked(Map map) ObjectRecord objectBacked(V value) RecordBuilder newRecord() // 通过builder方式来创建消息 RecordId 表示消息ID

你读过上面的帖子,就会知道。一条消息的ID是唯一的。并且有2部分组成

// ----------- 读取ID属性的实例方法 // 是否是系统自动生成的 boolean shouldBeAutoGenerated(); // 获取原始的id字符串 String getValue(); // 获取序列号部分 long getSequence(); // 获取时间戳部分 long getTimestamp(); // ----------- 创建ID的静态方法 RecordId of(@Nullable String value) RecordId of(long millisecondsTime, long sequenceNumber) RecordId autoGenerate() 往Stream推送消息 使用RedisTemplate @Autowired private StringRedisTemplate stringRedisTemplate; public void test () { // 创建消息记录, 以及指定stream StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", "KevinBlandy")).withStreamKey("mystream"); RecordId recordId = this.stringRedisTemplate.opsForStream().add(stringRecord); // 是否是自动生成的 boolean autoGenerated = recordId.shouldBeAutoGenerated(); // id值 String value = recordId.getValue(); // 序列号部分 long sequence = recordId.getSequence(); // 时间戳部分 long timestamp = recordId.getTimestamp(); } 使用RedisConnection @Autowired private RedisConnectionFactory redisConnectionFactory; public void test () { // 创建消息记录, 以及指定stream ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "KevinBlandy".getBytes())).withStreamKey("mystream".getBytes()); // 获取连接 RedisConnection redisConnection = this.redisConnectionFactory.getConnection(); RecordId recordId = redisConnection.xAdd(byteRecord); // 是否是自动生成的 boolean autoGenerated = recordId.shouldBeAutoGenerated(); // id值 String value = recordId.getValue(); // 序列号部分 long sequence = recordId.getSequence(); // 时间戳部分 long timestamp = recordId.getTimestamp(); } 从Stream消费消息 阻塞消费 StreamConsumerRunner

使用 ApplicationRnner,在系统启动以后,初始化监听器。开始监听消费。

import java.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ErrorHandler; @Component public class StreamConsumerRunner implements ApplicationRunner, DisposableBean { static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class); @Value("${redis.stream.consumer}") private String consumer; @Autowired RedisConnectionFactory redisConnectionFactory; @Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired StreamMessageListener streamMessageListener; @Autowired StringRedisTemplate stringRedisTemplate; private StreamMessageListenerContainer streamMessageListenerContainer; @Override public void run(ApplicationArguments args) throws Exception { // 创建配置对象 StreamMessageListenerContainerOptions streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions .builder() // 一次性最多拉取多少条消息 .batchSize(10) // 执行消息轮询的执行器 .executor(this.threadPoolTaskExecutor) // 消息消费异常的handler .errorHandler(new ErrorHandler() { @Override public void handleError(Throwable t) { // throw new RuntimeException(t); t.printStackTrace(); } }) // 超时时间,设置为0,表示不超时(超时后会抛出异常) .pollTimeout(Duration.ZERO) // 序列化器 .serializer(new StringRedisSerializer()) .build(); // 根据配置对象创建监听容器对象 StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer .create(this.redisConnectionFactory, streamMessageListenerContainerOptions); // 使用监听容器对象开始监听消费(使用的是手动确认方式) streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1"), StreamOffset.create("mystream", ReadOffset.lastConsumed()), this.streamMessageListener); this.streamMessageListenerContainer = streamMessageListenerContainer; // 启动监听 this.streamMessageListenerContainer.start(); } @Override public void destroy() throws Exception { this.streamMessageListenerContainer.stop(); } } StreamMessageListener

实现函数接口 StreamListener ,来自定义消息的消费实现

import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; @Component public class StreamMessageListener implements StreamListener{ static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class); @Autowired StringRedisTemplate stringRedisTemplate; @Override public void onMessage(MapRecord message) { // 消息ID RecordId messageId = message.getId(); // 消息的key和value Map body = message.getValue(); LOGGER.info("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body); // 通过RedisTemplate手动确认消息 this.stringRedisTemplate.opsForStream().acknowledge("mystream", message); } } 非阻塞消费

主要是通过StreamOperations 或者是 RedicConnection 的消费API来进行消息的随机消费

StreamOperations 中,关于读取操作的API

从RedisTemplate中获取到StreamOperations

StreamOperations s = this.stringRedisTemplate.opsForStream();

StreamOperations 的读取 API

// 随机范围读取 List range(Class targetType, K key, Range range) List range(Class targetType, K key, Range range, Limit limit) // 根据消息ID或者偏移量读取 List read(StreamOffset... streams) List read(Class targetType, StreamOffset... streams) List read(StreamReadOptions readOptions, StreamOffset... streams) List read(Class targetType, StreamReadOptions readOptions, StreamOffset... streams) List read(Consumer consumer, StreamOffset... streams) List read(Class targetType, Consumer consumer, StreamOffset... streams) List read(Consumer consumer, StreamReadOptions readOptions, StreamOffset... streams) List read(Class targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset... streams) // 随机逆向范围读取 List reverseRange(K key, Range range) List reverseRange(K key, Range range, Limit limit) List reverseRange(Class targetType, K key, Range range) List reverseRange(Class targetType, K key, Range range, Limit limit) // 消费者信息 XInfoConsumers consumers(K key, String group); // 消费者信息 XInfoGroups groups(K key); // stream信息 XInfoStream info(K key); // 获取消费组,消费者中未确认的消息 PendingMessagesSummary pending(K key, String group); PendingMessages pending(K key, Consumer consumer) PendingMessages pending(K key, String group, Range range, long count) PendingMessages pending(K key, String group, Range range, long count) 测试 先通过Redis控制台创建stream以及group。 127.0.0.1:6379> XADD mystream * hello world "1583208428680-0" 127.0.0.1:6379> XGROUP CREATE mystream group-1 $ OK 启动程序后,通过控制台往stream生产消息 127.0.0.1:6379> XADD mystream * name KevinBlandy "1583208571017-0"

程序成功的消费了这条消息

2020-03-03 12:09:34.159 INFO 9344 --- [lTaskExecutor-1] i.s.c.r.stream.StreamMessageListener : stream message。messageId=1583208571017-0, stream=mystream, body={name=KevinBlandy} 最后

对于Streram还有一些其他的操作。例如:通过RedisTemplate来发送消息,以及查看未ACK的消息,重新消费等等。 在这里没有一一列举。其实你如果学懂了Stream,那么我觉得这些API连蒙带猜也都知道是怎么用的。水到渠成的事儿,不难。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3